延續上回 Lab 1 的實作,我們已經創建了一套最基本、用固定窗口計數算法實現的限流器,從過程中可以得知固定窗口是個十分簡單、易於快速開發的算法,但它的局限在於時間窗口的交接點容易出現「流量突刺」,無法平滑的防範突如其來的流量高峰,導致後端會接收到超出預期數量外的請求,而今天要介紹並實作的滑動窗口計數器就是為了解決這問題所衍生出來的限流算法。
相較於固定窗口計數器會在特定的時間點重置,滑動窗口維護的是一個會隨著時間平滑移動的區間,它會預先將時間窗口切分成數個粒度更小的時間桶,每個窗口的請求總數都是時間桶分別累計的結果,而當請求進到限流器時,滑動窗口算法會將最久以前的時間桶移除,並在開頭添加進相同長度的時間桶以持續追蹤請求,由於窗口是連續滑動的,不會在時間的交集點重置,從而避免瞬間的流量爆發,能更準確維持預設的限流速率,提供更穩定、更一致的性能表現。
而在實作具體的限流算法前,我們需要一個擴展策略讓新的算法類別可以整合進當前的系統,所以開頭先來介紹今天 Lab 實作中會使用到的設計模式——「工廠模式」以及「策略模式」。如果我們不做這個擴展策略,在 @Aspect
切面的類別就需要根據新加入的算法擴增對應的判斷去處理,類似像下面這樣,一旦限流算法越來越多,需要注入的依賴也越多,耦合性也就越高,更不用說程式碼的改動範圍也越大:
// 略...
private final FixedWindowLimiter fixedWindowLimiter;
private final SlidingWindowLimiter slidingWindowLimiter;
@Around("@annotation(rateLimiter)")
public Object around(ProceedingJoinPoint joinPoint, RateLimiter rateLimiter) throws Throwable {
var key = rateLimiterKeyGenerator.generateKey(joinPoint, rateLimiter);
var isAllowed = true;
// 沒有盡頭的擴展下去......
switch (rateLimiter.algorithm()) {
case FIXED_WINDOW:
isAllowed = fixedWindowLimiter.isAllowed(key, rateLimiter.limit(), rateLimiter.window());
break;
case SLIDING_WINDOW:
isAllowed = slidingWindowLimiter.isAllowed(key, rateLimiter.limit(), rateLimiter.window());
break;
default:
break;
}
if (!isAllowed) {
// 略...
}
log.debug("Rate limit passed for key: {}", key);
return joinPoint.proceed();
}
為了優化上述切面無止境擴展的問題,我們可以引入一個工廠類,在這個工廠創建時,它的建構子會去搜集所有實作 RateLimiterStrategy
這個策略介面的類別創建一個 Map 資料結構,用各個算法當作 key,具體的算法實作類作為 value,可以看作這個工廠的設備已經搭建好:
private final Map<Algorithm, RateLimiterStrategy> strategies;
public RateLimiterFactory(List<RateLimiterStrategy> strategyList) {
this.strategies = strategyList.stream()
.collect(Collectors.toMap(
RateLimiterStrategy::getAlgorithmType,
Function.identity()
));
}
接著再建立一個工廠流水線,當送入對應的算法原料時,流水線就會吐出一個對應的算法實作類別給切面:
public RateLimiterStrategy getStrategy(Algorithm algorithm) {
var startegy = strategies.get(algorithm);
return Optional.ofNullable(startegy)
.orElseThrow(() -> new BaseException(StatusCode.UNKNOW_ERR, "Rate limiter strategy not found"));
}
整個工廠蓋起來會長成這樣:
@Slf4j
@Component
public class RateLimiterFactory {
private final Map<Algorithm, RateLimiterStrategy> strategies;
public RateLimiterFactory(List<RateLimiterStrategy> strategyList) {
this.strategies = strategyList.stream()
.collect(Collectors.toMap(
RateLimiterStrategy::getAlgorithmType,
Function.identity()
));
}
public RateLimiterStrategy getStrategy(Algorithm algorithm) {
var startegy = strategies.get(algorithm);
return Optional.ofNullable(startegy)
.orElseThrow(() -> new BaseException(StatusCode.UNKNOW_ERR, "Rate limiter strategy not found"));
}
}
總結來說工廠模式強調的是:
策略模式的核心概念是定義一系列算法,並將每個算法封裝起來,使他們可以相互替換,避免在選擇算法時用大量的 if-else 跟 switch-case 分支判斷,讓算法的選擇可以跟具體實作類別分離。在策略模式中主要的三個核心角色:
一旦定義好策略介面後,就可以讓具體的算法類實作:
public interface RateLimiterStrategy {
boolean isAllow(String key, RateLimiter rateLimiter);
Algorithm getAlgorithmType();
}
實作完只需要在客戶端的類別注入介面即可,工廠不需要知道具體要去取得什麼算法,只需要傳參數進去等待介面返回對應的實例就好,這邊很好地利用了物件導向的多型概念,工廠拿到的都會是 RateLimiterStrategy
這個類別,但具體實作卻可以自由變化,這些變化完全被封裝起來了。最後再注入工廠類到切面,切面完全也不用知道工廠的底層,只管接收對應的策略就好,下面的優化完全不用加入 if-else 或 switch-case 判斷,之後要新增算法直接創建新的策略實例即可:
private final RateLimiterKeyGenerator rateLimiterKeyGenerator;
private final RateLimiterFactory rateLimiterFactory;
@Around("@annotation(rateLimiter)")
public Object around(ProceedingJoinPoint joinPoint, RateLimiter rateLimiter) throws Throwable {
var key = rateLimiterKeyGenerator.generateKey(joinPoint, rateLimiter);
var strategy = rateLimiterFactory.getStrategy(rateLimiter.algorithm());
var isAllowed = strategy.isAllow(key, rateLimiter);
if (!isAllowed) {
var method = joinPoint.getSignature().toShortString();
log.warn("Rate limit exceed for method: {}, key: {}", method, key);
throw new BaseException(StatusCode.TOO_MANY_REQUEST,
String.format("Rate limit exceeded. Max %d requests per %d seconds",
rateLimiter.limit(), rateLimiter.window()));
}
log.debug("Rate limit passed for key: {}", key);
return joinPoint.proceed();
}
這樣整條限流器供應鏈就搭建完成了,每個請求就像一個客戶走進零售店 (API),收銀員根據客戶提出的要求,打出訂單送到後台,後台在做之前會先經過老闆娘的法眼 (Aspect),她會確認訂單是哪種服務,再把這個服務的種類 (Alogrithm) 送到人資那邊,人資 (Factory) 直接把這種類原封不動 pass 給開發部 (Strategy),開發主管看到了這是哪種東西,就把負責的開發人員推出去「你去搞定吧,我不管了」,人資抓到負責人以後又跟開發說「你去告訴老闆娘這件事可不可做吧,我不管了」,開發就跑去老闆娘面前說「這事目前人手不足,先緩緩吧,過 10 分鐘才能安排」,老闆娘於是打去前台說「叫客人晚點再來吧」,這整件事的核心在於——「所有人都只要一個結果」,根本不在乎過程,不在乎其他人是怎麼得到這些資訊的,這有個好處是,假設這家店之後新增了 10 個品項,那只需要新增 10 個開發人員並把他們編列開發主管底下就好,其他的組織架構完全不用異動,按照同樣的模式一樣可以得到預期的結果。
實作重點:
在計數器中增加滑動窗口的資料結構。
這邊特別有兩個概念,首先是需要清楚理解時間桶的概念是什麼,以及他是如何被定義、運用什麼計算方式將它切分出來,其二是為了儲存這個時間桶與其相對應的計數,該怎麼選擇適合的資料結構,而這資料結構將有別於固定窗口計數器的儲存器那麼單純。
創建一個新的滑動窗口策略實作類。
先用高層次理解滑動窗口算法裡「時間桶」的概念,我們在實作滑動窗口前會先定義一個時間片段的大小,舉例來說這行常數 SLIDING_WIN_BUCKET_SIZE_SECS
設為 10,意思是說假設限流器的窗口是 60 秒可接受 200 個請求,就會劃分成 6 個時間桶,當前請求進來取得當下時間戳後會「向下取整」(timestamp / SLIDING_WIN_BUCKET_SIZE_SECS) * SLIDING_WIN_BUCKET_SIZE_SECS;
,看是要把請求歸類到哪個時間區段,就在那個時間桶上加 1,每次進來限流器都是回推過去 60 秒所有時間桶加總的請求數,而不是很死板地用固定的時間窗口判斷,如果過去 60 秒內所有時間桶的總和已經達到 200 個請求,此時新的請求就會被拒絕,就算剛好在時間窗口的交集點也不會發生流量突刺,而是很平滑地防護所有預期外的流量,以下是計數器的實現邏輯:
private static final int SLIDING_WIN_BUCKET_SIZE_SECS = 10;
定義了時間桶的大小。ConcurrentHashMap<String, ConcurrentSkipListMap<Long, AtomicLong>> slidingWindowStorage
定義了每個 key 的請求計數器。ConcurrentSkipListMap<Long, AtomicLong>
計數器裡的結構是各個時間桶的統計數據,key 是用時間戳定義的時間桶,value 就是這個時間桶的請求累計次數。private static final ConcurrentHashMap<String, ConcurrentSkipListMap<Long, AtomicLong>> slidingWindowStorage = new ConcurrentHashMap<>();
private static final int SLIDING_WIN_BUCKET_SIZE_SECS = 10;
@Override
public long addToSlidingWindow(String key, long timestamp, int windowSeconds) {
var bucketTimestamp = (timestamp / SLIDING_WIN_BUCKET_SIZE_SECS) * SLIDING_WIN_BUCKET_SIZE_SECS;
var timeSeries = slidingWindowStorage.computeIfAbsent(key, k -> new ConcurrentSkipListMap<>());
var bucketCount = timeSeries.computeIfAbsent(bucketTimestamp, k -> new AtomicLong(0));
bucketCount.incrementAndGet();
var expireTime = timestamp - windowSeconds;
timeSeries.headMap(expireTime).clear();
return getCountInSlidingWindow(key, timestamp - windowSeconds, timestamp);
}
一開始計算出當前時間戳對應的時間桶 bucketTimestamp
,接著取得這個 key 所有的時間桶 timeSeries
,再用計算出的當前時間桶取出累計的請求數 bucketCount
並++。
var bucketTimestamp = (timestamp / SLIDING_WIN_BUCKET_SIZE_SECS) * SLIDING_WIN_BUCKET_SIZE_SECS;
var timeSeries = slidingWindowStorage.computeIfAbsent(key, k -> new ConcurrentSkipListMap<>());
var bucketCount = timeSeries.computeIfAbsent(bucketTimestamp, k -> new AtomicLong(0));
bucketCount.incrementAndGet();
接著,平滑移動、更新時間窗口,用當下時間戳往前推 60 秒計算出有效時間範圍,expireTime
這代表著時間窗口的最末端,然後用 headMap
可以將 expireTime
最末端以前的所有數據清除,如此一來最新的時間桶序列就更新完成了,然後再去把時間範圍內的所有請求次數統計出來給限流器判斷就大功告成!
var expireTime = timestamp - windowSeconds;
timeSeries.headMap(expireTime).clear();
getCountInSlidingWindow(key, expireTime, timestamp);
完整的實作如下,包含了前一版固定窗口的實作,另外還有 key 的清除策略,基本上清除的邏輯也都是運用 ConcurrentSkipListMap
提供的 headMap
方法來清除時間窗口最末端以前的數據,這邊就不展開說明了,因為我自己體感下來前面更新時間窗口、推算時間桶的邏輯相對比較難理解,尤其初次應用到 ConcurrentSkipListMap
這個資料結構,時間桶概念又很抽象,所以前面的過程假設通了,後面清除邏輯其實很好懂。
清理過期時間桶是為了避免記憶體無限增長,只保留滑動窗口範圍內的數據即可。
至於為何使用 ConcurrentSkipListMap
這個資料結構,是因為只要 key 的類型有實作 Comparable 介面,它就會在儲存的當下自動為 key 做排序,這對於更新時間窗口來說很方便。
@Component
public class MemoryRateLimiterStorage implements RateLimiterStorage {
private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private static final ConcurrentHashMap<String, ConcurrentSkipListMap<Long, AtomicLong>> slidingWindowStorage = new ConcurrentHashMap<>();
private static final int SLIDING_WIN_BUCKET_SIZE_SECS = 10;
public MemoryRateLimiterStorage() {
scheduler.scheduleAtFixedRate(this::cleanExpireKey, 30, 30, TimeUnit.SECONDS);
}
@Override
public long addToSlidingWindow(String key, long timestamp, int windowSeconds) {
var bucketTimestamp = (timestamp / SLIDING_WIN_BUCKET_SIZE_SECS) * SLIDING_WIN_BUCKET_SIZE_SECS;
var timeSeries = slidingWindowStorage.computeIfAbsent(key, k -> new ConcurrentSkipListMap<>());
var bucketCount = timeSeries.computeIfAbsent(bucketTimestamp, k -> new AtomicLong(0));
bucketCount.incrementAndGet();
var expireTime = timestamp - windowSeconds;
timeSeries.headMap(expireTime).clear();
return getCountInSlidingWindow(key, expireTime, timestamp);
}
@Override
public long getCountInSlidingWindow(String key, long windownStartTime, long windowEndTime) {
var timeSeries = slidingWindowStorage.get(key);
if (timeSeries == null) {
return 0;
}
return timeSeries.subMap(windownStartTime, true, windowEndTime, true)
.values()
.stream()
.mapToLong(AtomicLong::get)
.sum();
}
@Override
public void cleanupExpiredWindows(String key, long expireTime) {
var timeSeries = slidingWindowStorage.get(key);
if (timeSeries != null) {
timeSeries.headMap(expireTime).clear();
if (timeSeries.isEmpty()) {
slidingWindowStorage.remove(key);
}
}
}
private void cleanExpireKey() {
var currentTime = System.currentTimeMillis() / 1000;
var expireTime = currentTime - 1000;
// fixed window
storage.entrySet().removeIf(entry -> entry.getValue().isExpired());
// sliding window
slidingWindowStorage.forEach((key, timeSeries) -> {
timeSeries.headMap(expireTime).clear();
});
slidingWindowStorage.entrySet().removeIf((entry -> entry.getValue().isEmpty()));
}
private record Counter(AtomicLong count, long expireTime) {
public boolean isExpired() {
return System.currentTimeMillis() > expireTime;
}
}
}
最後再把實作好的計數器邏輯,注入並應用到滑動窗口的策略實例即可,這樣切面找工廠拿方法時,策略模式就會吐出一個滑動窗口的實例 SlidingWindowCounterLimiter
來判斷限流邏輯囉!
@RequiredArgsConstructor
@Component
public class SlidingWindowCounterLimiter implements RateLimiterStrategy {
private final RateLimiterStorage storage;
@Override
public boolean isAllow(String key, RateLimiter rateLimiter) {
var currentTimestamp = System.currentTimeMillis() / 1000;
var currentCount = storage.addToSlidingWindow(key, currentTimestamp, rateLimiter.window());
return currentCount <= rateLimiter.limit();
}
@Override
public Algorithm getAlgorithmType() {
return Algorithm.SLIDING_WINDOW_COUNTER;
}
}
明天實作預計實作 Token Bucket 的限流算法。
話說不知為何 code block 都好像有點跑版,如果造成閱讀不適還請見諒,我再看看後續怎麼優化。